package com.tl.spark.scala

/**
  * @program: spark-test
  * @description:
  * @author: dong.tl
  * @create: 2018-09-26 16:12
  **/

import java.io.ByteArrayInputStream

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
import org.dom4j.Document
import org.dom4j.io.SAXReader

/**
  * @program: spark-test
  * @description:
  * @author: dong.tl
  * @create: 2018-09-26 15:27
  **/
object Spark_RO_Hbase {
  def main(args: Array[String]): Unit = {
    val tableName = "file_data"
    //Hbase设置
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址，也可以通过将hbase-site.xml导入classpath，但是建议在程序里这样设置
    conf.set(HConstants.ZOOKEEPER_QUORUM, "db01,db02,db03")
    //设置zookeeper连接端口，默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    //设置读取的表
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    //设置写入的表
    conf.set(TableOutputFormat.OUTPUT_TABLE, "parse_xml")


    //创建sparkConf
    val sparkConf = new SparkConf()
    //设置spark的任务名
    sparkConf.setAppName("RO_hbase ")
    //设置master地址
//    sparkConf.setMaster("spark://192.168.173.221:7077")
////    sparkConf.setMaster("local[*]")
//          .setJars(Seq("F:\\IdeaProjects\\spark-test\\target\\spark-test-1.0-SNAPSHOT.jar"))
    //创建spark上下文
    val sc = new SparkContext(sparkConf)


    //全量读取hbase表
    val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat]
      , classOf[ImmutableBytesWritable]
      , classOf[Result]
    )

    var job = Job.getInstance(conf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    val value = rdd.filter(
      re => {
        val tp: String = Bytes.toString(re._2.getValue("file_info".getBytes, "type".getBytes))
        tp.equalsIgnoreCase("XML")
      }
    ).map { case (rowkey: ImmutableBytesWritable, result: Result) => {
      val rk = Bytes.toString(rowkey.get())
      try {
        val reader = new SAXReader
        var put = new Put(Bytes.toBytes(rk.split("\\|")(0)))
        val doc: Document = reader.read(new ByteArrayInputStream(result.getValue("file_info".getBytes, "bytes".getBytes)))

        put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("cn"), Bytes.toBytes(util.getStringValue(doc, "/business:PatentDocumentAndRelated/business:BibliographicData/business:PublicationReference[@dataFormat='standard']/base:DocumentID/base:WIPOST3Code/text()")))
        put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("dateProduced"), Bytes.toBytes(util.getStringValue(doc, "/business:PatentDocumentAndRelated/@dateProduced")))
        put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("keyID"), Bytes.toBytes(util.getStringValue(doc, "/business:PatentDocumentAndRelated/@keyID")))
        put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("file"), Bytes.toBytes(util.getStringValue(doc, "/business:PatentDocumentAndRelated/@file")))
        put.addColumn(Bytes.toBytes("data"), Bytes.toBytes("fileReferenceId"), Bytes.toBytes(util.getStringValue(doc, "/business:PatentDocumentAndRelated/@fileReferenceId")))

        (new ImmutableBytesWritable, put)
      } catch {
        case exception: Exception => (rk, exception.getMessage)
      }
    }
    }




    value.filter{case(k,v)=>{
      k.isInstanceOf[ImmutableBytesWritable]
    }}.saveAsNewAPIHadoopDataset(job.getConfiguration)

    value.foreach { case (k, put) => {
      println(k.isInstanceOf[ImmutableBytesWritable] + ":" + put)
    }
    }


    sc.stop()

  }
}

object util {
  def getStringValue(doc: Document, xptah: String): String = {
    val nodes = doc.selectNodes(xptah)
    if (nodes.size() == 0)
      ""
    else
      nodes.get(0).getStringValue
  }
}
